-
Notifications
You must be signed in to change notification settings - Fork 3
/
Glue-Job-Script
executable file
·98 lines (80 loc) · 3.77 KB
/
Glue-Job-Script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
sqlContext = SQLContext(sc)
##------------------> Job picks up the parameter at run time.
args = getResolvedOptions(sys.argv, ['MyBatchID'])
##--------------> Glue converting to parquet from s3 location dynamically identified using job parameter
import json
jsonDataFrame = sqlContext.read.json("s3://glue.sneaql.full360.com/beers/batch=" + args['MyBatchID'] + "/*.json.gz")
jsonDataFrame = jsonDataFrame.withColumnRenamed("C0", "beer_event_id")
jsonDataFrame = jsonDataFrame.withColumnRenamed("C1", "drinking_session_id")
jsonDataFrame = jsonDataFrame.withColumnRenamed("C2", "user_id")
jsonDataFrame = jsonDataFrame.withColumnRenamed("C3", "beer")
jsonDataFrame = jsonDataFrame.withColumnRenamed("C4", "schmooziest_buzzword")
jsonDataFrame = jsonDataFrame.withColumnRenamed("C5", "best_thing_said")
jsonDataFrame = jsonDataFrame.withColumnRenamed("C6", "worst_thing_said")
jsonDataFrame = jsonDataFrame.withColumnRenamed("C7", "drunken_babble")
jsonDataFrame = jsonDataFrame.withColumnRenamed("C8", "likes")
jsonDataFrame = jsonDataFrame.withColumnRenamed("C9", "beer_opened_time")
jsonDataFrame = jsonDataFrame.select(["beer_event_id","drinking_session_id","user_id","beer","schmooziest_buzzword","best_thing_said","worst_thing_said","drunken_babble","likes" ,"beer_opened_time"])
jsonDataFrame.write.parquet("s3://glue.sneaql.full360.com/beers-parquet/batch=" + args['MyBatchID'])
##--------------> Glue placing the parquet file in s3 location dynamically created using job parameter
##--------------> prepare boto3 import
##--------------> boto3 does not work out of a zip file and needs to be extracted.
import zipfile
zip_ref = zipfile.ZipFile('virtualenv_python.zip', 'r')
zip_ref.extractall('./tmp/packages')
zip_ref.close()
sys.path.insert(0, './tmp/packages')
##--------------> Glue providing batch Id in the logs
print('MyBatchID--->>>')
print(args['MyBatchID'])
##-------------->Glue initiating SneaQL container
import boto3
# create aws api client for ecs
client = boto3.client('ecs' , region_name='us-east-1')
# run client to trigger sneaql ecs task
# note the parameters passed to sneaql via
# environment variable overrides
response = client.run_task(
cluster='sneaql-ritu',
taskDefinition='sneaql-ritu-task:1',
overrides={
'containerOverrides': [
{
'name': 'sneaql-ritu-con1',
'environment': [
{
'name': 'SNEAQL_BISCUIT',
'value': 'https://s3-us-west-2.amazonaws.com/secrets-files.full360.com/sneaql-demo/ritu-secret.yml'
},
{
'name': 'SNEAQL_GIT_REPO',
'value': 'https://git-codecommit.us-east-1.amazonaws.com/v1/repos/sneaql-ritu'
},
{
'name': 'BATCHID',
'value': args['MyBatchID']
},
{
'name': 'SNEAQL_GIT_REPO_BRANCH',
'value': 'master'
}
]
}
]
}
)
print(response)
## SneaQL container initiation -------> END
job.commit()